using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using HslCommunication.Core.Net;

namespace HslCommunication.Enthernet.Redis
{
	/// <summary>
	/// Redis协议的订阅操作，一个对象订阅一个或是多个频道的信息
	/// </summary>
	public class RedisSubscribe : NetworkXBase
	{
		private IPEndPoint endPoint;

		private string[] keyWords = null;

		private Action<string, string> action;

		private int reconnectTime = 10000;

		/// <summary>
		/// 如果Redis服务器设置了密码，此处就需要进行设置。必须在CreatePush方法调用前设置
		/// </summary>
		public string Password
		{
			get;
			set;
		}

		/// <summary>
		/// 实例化一个发布订阅类的客户端，需要指定ip地址，端口，及订阅关键字
		/// </summary>
		/// <param name="ipAddress">服务器的IP地址</param>
		/// <param name="port">服务器的端口号</param>
		/// <param name="keys">订阅关键字</param>
		public RedisSubscribe(string ipAddress, int port, string[] keys)
		{
			endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
			keyWords = keys;
			if (keys == null)
			{
				throw new Exception(StringResources.Language.KeyIsNotAllowedNull);
			}
		}

		/// <summary>
		/// 实例化一个发布订阅类的客户端，需要指定ip地址，端口，及订阅关键字
		/// </summary>
		/// <param name="ipAddress">服务器的IP地址</param>
		/// <param name="port">服务器的端口号</param>
		/// <param name="key">订阅关键字</param>
		public RedisSubscribe(string ipAddress, int port, string key)
		{
			endPoint = new IPEndPoint(IPAddress.Parse(ipAddress), port);
			keyWords = new string[1]
			{
				key
			};
			if (string.IsNullOrEmpty(key))
			{
				throw new Exception(StringResources.Language.KeyIsNotAllowedNull);
			}
		}

		private OperateResult CreatePush()
		{
			CoreSocket?.Close();
			OperateResult<Socket> operateResult = CreateSocketAndConnect(endPoint, 5000);
			if (!operateResult.IsSuccess)
			{
				return operateResult;
			}
			if (!string.IsNullOrEmpty(Password))
			{
				OperateResult operateResult2 = Send(operateResult.Content, RedisHelper.PackStringCommand(new string[2]
				{
					"AUTH",
					Password
				}));
				if (!operateResult2.IsSuccess)
				{
					return operateResult2;
				}
				OperateResult<byte[]> operateResult3 = ReceiveRedisCommand(operateResult.Content);
				if (!operateResult3.IsSuccess)
				{
					return operateResult3;
				}
				string @string = Encoding.UTF8.GetString(operateResult3.Content);
				if (!@string.StartsWith("+OK"))
				{
					return new OperateResult(@string);
				}
			}
			List<string> list = new List<string>();
			list.Add("SUBSCRIBE");
			list.AddRange(keyWords);
			OperateResult operateResult4 = Send(operateResult.Content, RedisHelper.PackStringCommand(list.ToArray()));
			if (!operateResult4.IsSuccess)
			{
				return operateResult4;
			}
			CoreSocket = operateResult.Content;
			try
			{
				operateResult.Content.BeginReceive(new byte[0], 0, 0, SocketFlags.None, ReceiveCallBack, operateResult.Content);
			}
			catch (Exception ex)
			{
				return new OperateResult(ex.Message);
			}
			return OperateResult.CreateSuccessResult();
		}

		private void ReceiveCallBack(IAsyncResult ar)
		{
			Socket socket = ar.AsyncState as Socket;
			if (socket == null)
			{
				return;
			}
			try
			{
				int num = socket.EndReceive(ar);
			}
			catch (ObjectDisposedException)
			{
				return;
			}
			catch (Exception ex2)
			{
				SocketReceiveException(ex2);
				return;
			}
			OperateResult<byte[]> operateResult = ReceiveRedisCommand(socket);
			if (!operateResult.IsSuccess)
			{
				SocketReceiveException(null);
				return;
			}
			try
			{
				socket.BeginReceive(new byte[0], 0, 0, SocketFlags.None, ReceiveCallBack, socket);
			}
			catch (Exception ex3)
			{
				SocketReceiveException(ex3);
				return;
			}
			OperateResult<string[]> stringsFromCommandLine = RedisHelper.GetStringsFromCommandLine(operateResult.Content);
			if (!stringsFromCommandLine.IsSuccess)
			{
				base.LogNet?.WriteWarn(stringsFromCommandLine.Message);
			}
			else if (!(stringsFromCommandLine.Content[0].ToUpper() == "SUBSCRIBE"))
			{
				if (stringsFromCommandLine.Content[0].ToUpper() == "MESSAGE")
				{
					action?.Invoke(stringsFromCommandLine.Content[1], stringsFromCommandLine.Content[2]);
				}
				else
				{
					base.LogNet?.WriteWarn(stringsFromCommandLine.Content[0]);
				}
			}
		}

		private void SocketReceiveException(Exception ex)
		{
			do
			{
				if (ex != null)
				{
					base.LogNet?.WriteException("Offline", ex);
				}
				Console.WriteLine(StringResources.Language.ReConnectServerAfterTenSeconds);
				Thread.Sleep(reconnectTime);
			}
			while (!CreatePush().IsSuccess);
			Console.WriteLine(StringResources.Language.ReConnectServerSuccess);
		}

		/// <summary>
		/// 创建数据推送服务
		/// </summary>
		/// <param name="pushCallBack">触发数据推送的委托</param>
		/// <returns>是否创建成功</returns>
		public OperateResult CreatePush(Action<string, string> pushCallBack)
		{
			action = pushCallBack;
			return CreatePush();
		}

		/// <summary>
		/// 关闭消息推送的界面
		/// </summary>
		public void ClosePush()
		{
			action = null;
			CoreSocket?.Close();
		}

		/// <inheritdoc />
		public override string ToString()
		{
			return $"RedisSubscribe[{endPoint}]";
		}
	}
}
